support Ray actor AgentLoop and move judger resource config into AgentLoop#1663
support Ray actor AgentLoop and move judger resource config into AgentLoop#1663YanhuiDua merged 6 commits intoInternLM:rl_designfrom
Conversation
512cefc to
be1cb06
Compare
|
@claude review |
| if isinstance(config, JudgerConfig): | ||
| judger_dict[key] = config.build() | ||
| elif callable(config): | ||
| judger_dict[key] = config | ||
| else: | ||
| raise ValueError(f"Invalid judger config type: {type(config)} for key {key}") | ||
| return judger_dict | ||
| elif isinstance(self.judger_config, JudgerConfig): | ||
| return self.judger_config.build() | ||
| elif callable(self.judger_config): | ||
| return self.judger_config | ||
| else: | ||
| raise ValueError(f"Invalid judger config type: {type(self.judger_config)}") | ||
|
|
There was a problem hiding this comment.
Claude: Warning — build_judger() calls JudgerConfig.build() without pg / start_bundle_idx.
JudgerConfig.build() accepts an optional PlacementGroup for resource allocation (native.py). When called without pg, it creates its own placement group per judger instance.
Previously, the trainer built a single judger and shared it between train and eval agent loop managers. Now each AgentLoopConfig.build() calls build_judger() independently, so if the same judger_config is used in both train and eval configs, two separate judger instances (and potentially two sets of Ray actors for router-type judgers) are created.
This is likely fine for judger_type="native" (no Ray actors), but could double resource consumption for "router" or "ray.actor" types. Please verify this is the intended behavior, or consider accepting an optional pre-built judger to allow sharing.
| assert judger_response is not None, "Reward handler did not return a response." | ||
| # native postprocess | ||
| assert isinstance(judger_response, dict), ( | ||
| f"Reward handler must return a dict, but got {type(judger_response)}." |
There was a problem hiding this comment.
Claude: Nit — good addition. This assertion replaces the # native postprocess comment with a concrete runtime validation that the reward handler returns a dict. Much clearer contract.
Minor: for consistency with the new TypeError in agent_loop.py:judge_sample (line 105-106), you could use TypeError here instead of AssertionError (assertions can be stripped with -O). But this is a very minor point and not blocking.
|
Claude: ## Summary Warning
Nit
Verdict: COMMENT — no blocking issues, but the resource duplication for router-type judgers (warning above) deserves confirmation. |
be1cb06 to
51f321c
Compare
51f321c to
ad14668
Compare
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
ad14668 to
534868d
Compare
|
@claude review, and give some advice on this PR's code design |
| @dataclass(frozen=True) | ||
| class _TaskRunner: | ||
| task_name: str | ||
| agent_loop: AgentLoop | ||
| agent_loop: AgentLoopSpec | ||
| produce_strategy: ProduceStrategy |
There was a problem hiding this comment.
Claude: Critical — same rollout_ctl attribute access issue as in producer.py.
_TaskRunner.agent_loop was widened from AgentLoop to AgentLoopSpec, but AgentLoopManager.produce_batch() (unchanged in this PR) still accesses task.agent_loop.rollout_ctl directly as a plain attribute. This will raise AttributeError when the agent loop is a RayAgentLoopProxy.
Since produce_batch is the main entry point for the training loop, this is a runtime crash waiting to happen for any ray.actor agent loop configuration.
The RouterAgentLoop path is fine (it stores rollout_ctl as an instance attribute), but the single-actor RayAgentLoopProxy path is broken.
|
|
||
| async def _cleanup_pending_tasks( | ||
| self, pending_tasks: set, agent_loop: AgentLoop, replay_buffer: ReplayBuffer, task_name: str | ||
| self, pending_tasks: set, agent_loop: AgentLoopSpec, replay_buffer: ReplayBuffer, task_name: str |
There was a problem hiding this comment.
Claude: Critical — agent_loop.rollout_ctl will crash for RayAgentLoopProxy.
The type was widened from AgentLoop to AgentLoopSpec, and _timed_generate_group (above) was correctly updated with an isinstance(agent_loop, ray.actor.ActorHandle) branch. But this method still accesses .rollout_ctl as a plain attribute on line 203.
On a RayAgentLoopProxy (Ray actor proxy), this will raise AttributeError at runtime — attribute access doesn't forward to the remote object. You'd need await agent_loop.get_rollout_ctl.remote() (using the method defined in AgentLoopActor).
This will surface whenever AgentLoopConfig(type="ray.actor", num_ray_actors=1) is used with the async produce strategy.
|
|
||
| def _build_worker(self, pg: PlacementGroup | None = None, bundle_idx: int = 0) -> RayJudgerProxy: | ||
| pg_options = {"num_cpus": self.num_cpus_per_actor, "memory": self.cpu_memory_per_actor} | ||
| if pg is None: | ||
| # NOTE: 保持与 router 构建逻辑一致,默认创建 PlacementGroup。 | ||
| from xtuner.v1.rl.utils.ray_worker import CPUResourcesConfig | ||
|
|
||
| cpu_resource_cfg = CPUResourcesConfig( | ||
| num_workers=self.num_ray_actors, | ||
| num_cpus_per_worker=self.num_cpus_per_actor, | ||
| cpu_memory_per_worker=self.cpu_memory_per_actor, | ||
| ) | ||
| pg = cpu_resource_cfg.build_placement_group() | ||
| ray.get(pg.ready()) | ||
| bundle_idx = 0 | ||
|
|
||
| assert len(pg.bundle_specs) > bundle_idx, "Placement group does not have enough bundles for ray actor." | ||
| assert pg.bundle_specs[bundle_idx].get("CPU", 1) >= self.num_cpus_per_actor, ( | ||
| f"Placement group bundle {bundle_idx} does not have enough CPU resources." | ||
| ) | ||
| assert pg.bundle_specs[bundle_idx].get("memory", 0) >= self.cpu_memory_per_actor, ( | ||
| f"Placement group bundle {bundle_idx} does not have enough memory resources." | ||
| ) | ||
| return RayJudger.options( | ||
| placement_group=pg, | ||
| placement_group_bundle_index=bundle_idx, | ||
| **pg_options, | ||
| ).remote( | ||
| def get_num_placement_group_bundles(self) -> int: | ||
| if self.judger_type == "native": | ||
| return 0 | ||
| return self.num_ray_actors | ||
|
|
||
| def get_cpu_bundles(self) -> list[dict[str, float | int]]: | ||
| return [ | ||
| { | ||
| "CPU": self.num_cpus_per_actor, | ||
| "memory": self.cpu_memory_per_actor, | ||
| } |
There was a problem hiding this comment.
Claude: Warning — RouterJudger is now silently wrapped as a Ray actor, which changes error semantics.
Previously, RouterJudger was a local Python object with an asyncio lock for load balancing. This PR wraps it as a Ray actor via CPUActorLauncher.build_actor(RouterJudger, ...), which means:
- Exceptions inside
RouterJudger.judgewill now be wrapped inray.exceptions.RayTaskErrorinstead of propagating directly. - The
asyncio.Locknow coordinates within the Ray actor's event loop (not the caller's), which is fine but the locking semantics change subtly under high concurrency. - The
actor_num_cpus=0, actor_memory=0means this actor has no reserved resources — it could be starved if the cluster is fully allocated.
Point 3 is especially worth considering: a zero-resource actor relies on Ray's lenient scheduling but could be deprioritized or fail to schedule in a resource-constrained cluster.
|
Claude: ## Review Summary This PR introduces Critical
Warning
Nit
VerdictREQUEST_CHANGES — the |
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
| model_config = ConfigDict(extra="forbid", arbitrary_types_allowed=True) | ||
| hf_checkpoint: str | ||
| sample_params: SampleParams | ||
| type: Literal["local", "ray.actor"] = "local" |
There was a problem hiding this comment.
我觉得这个可以不要,然后将 num_ray_actors 默认设置为 0,表示不用 ray.actor。这样就简单点,少了一个参数。配置大于1就是启动 n 个
| rollout_ctl=rollout_controller, | ||
| ) | ||
|
|
||
| def build_ray_actor_list( |
| judger = self.judger | ||
| if isinstance(judger, dict): | ||
| if len(judger) > 1: | ||
| raise NotImplementedError("Multiple judgers require a custom AgentLoop.judge_sample implementation.") |
There was a problem hiding this comment.
这种判断要提前到 config 构建阶段,不能在运行时候才报错。
…tLoop (#1663) * Introduce CPUActorLauncher infrastructure Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Refactor judger and agent loop core abstractions Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Migrate configs and tests to the new judger interface Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Fix rollout controller access and CPU actor launcher defaults Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * mv judger_sample from agent_loop * fix cpu pg --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
No description provided.